Revert TwitterStreamAgent back to the twitter-stream gem

Dominik Sander лет %!s(int64=9): %!d(string=назад)
Родитель
Сommit
08028aa9ea
6 измененных файлов с 165 добавлено и 101 удалено
  1. 2 3
      Gemfile
  2. 20 0
      Gemfile.lock
  3. 2 3
      app/concerns/long_runnable.rb
  4. 66 39
      app/models/agents/twitter_stream_agent.rb
  5. 1 2
      lib/agent_runner.rb
  6. 74 54
      spec/models/agents/twitter_stream_agent_spec.rb

+ 2 - 3
Gemfile

@@ -25,6 +25,7 @@ gem "google-api-client", require: 'google/api_client'
25 25
 
26 26
 # Twitter Agents
27 27
 gem 'twitter', '~> 5.14.0' # Must to be loaded before cantino-twitter-stream.
28
+gem 'twitter-stream', github: 'cantino/twitter-stream', branch: 'huginn'
28 29
 gem 'omniauth-twitter'
29 30
 
30 31
 # Tumblr Agents
@@ -43,9 +44,6 @@ gem 'omniauth-37signals'          # BasecampAgent
43 44
 # gem 'omniauth-github'
44 45
 gem 'omniauth-wunderlist', github: 'wunderlist/omniauth-wunderlist', ref: 'd0910d0396107b9302aa1bc50e74bb140990ccb8'
45 46
 
46
-# Uncomment to use 'em_http' as FARADAY_HTTP_BACKEND
47
-# gem 'em-http-request', '~> 1.1.2'
48
-
49 47
 # Bundler <1.5 does not recognize :x64_mingw as a valid platform name.
50 48
 # Unfortunately, it can't self-update because it errors when encountering :x64_mingw.
51 49
 unless Gem::Version.new(Bundler::VERSION) >= Gem::Version.new('1.5.0')
@@ -64,6 +62,7 @@ gem 'delayed_job', '~> 4.0.0'
64 62
 gem 'delayed_job_active_record', :git => 'https://github.com/cantino/delayed_job_active_record', :branch => 'configurable-reserve-sql-strategy'
65 63
 gem 'devise', '~> 3.4.0'
66 64
 gem 'dotenv-rails', '~> 2.0.1'
65
+gem 'em-http-request', '~> 1.1.2'
67 66
 gem 'faraday', '~> 0.9.0'
68 67
 gem 'faraday_middleware'
69 68
 gem 'feed-normalizer'

+ 20 - 0
Gemfile.lock

@@ -1,4 +1,14 @@
1 1
 GIT
2
+  remote: git://github.com/cantino/twitter-stream.git
3
+  revision: f7e7edb0bae013bffabf3598e7147773d9fd370f
4
+  branch: huginn
5
+  specs:
6
+    twitter-stream (0.1.15)
7
+      eventmachine (~> 1.0.7)
8
+      http_parser.rb (~> 0.6.0)
9
+      simple_oauth (~> 0.3.0)
10
+
11
+GIT
2 12
   remote: git://github.com/cantino/weibo_2.git
3 13
   revision: 00e57d29d8252126014b038cd738b02e05e4cfc5
4 14
   branch: master
@@ -138,6 +148,14 @@ GEM
138 148
       hashie
139 149
       multi_json
140 150
       oauth
151
+    em-http-request (1.1.2)
152
+      addressable (>= 2.3.4)
153
+      cookiejar
154
+      em-socksify (>= 0.3)
155
+      eventmachine (>= 1.0.3)
156
+      http_parser.rb (>= 0.6.0)
157
+    em-socksify (0.3.0)
158
+      eventmachine (>= 1.0.0.beta.4)
141 159
     em-websocket (0.5.1)
142 160
       eventmachine (>= 0.12.9)
143 161
       http_parser.rb (~> 0.6.0)
@@ -518,6 +536,7 @@ DEPENDENCIES
518 536
   devise (~> 3.4.0)
519 537
   dotenv-rails (~> 2.0.1)
520 538
   dropbox-api
539
+  em-http-request (~> 1.1.2)
521 540
   faraday (~> 0.9.0)
522 541
   faraday_middleware
523 542
   feed-normalizer
@@ -579,6 +598,7 @@ DEPENDENCIES
579 598
   tumblr_client
580 599
   twilio-ruby (~> 3.11.5)
581 600
   twitter (~> 5.14.0)
601
+  twitter-stream!
582 602
   typhoeus (~> 0.6.3)
583 603
   tzinfo (>= 1.2.0)
584 604
   tzinfo-data

+ 2 - 3
app/concerns/long_runnable.rb

@@ -70,8 +70,7 @@ module LongRunnable
70 70
         rescue SignalException, SystemExit
71 71
           stop!
72 72
         rescue StandardError => e
73
-          message = "Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
74
-          STDERR.puts "\n#{message}\n\n"
73
+          message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
75 74
           agent.error(message)
76 75
         end
77 76
       end
@@ -89,7 +88,7 @@ module LongRunnable
89 88
       if respond_to?(:stop)
90 89
         stop
91 90
       else
92
-        @thread.terminate
91
+        thread.terminate
93 92
       end
94 93
     end
95 94
 

+ 66 - 39
app/models/agents/twitter_stream_agent.rb

@@ -140,7 +140,8 @@ module Agents
140 140
           end
141 141
         end
142 142
 
143
-        config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] } << oauth_token
143
+        config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] }
144
+        config_hash.push(oauth_token)
144 145
 
145 146
         Worker.new(id: agents.first.worker_id(config_hash),
146 147
                    config: {filter_to_agent_map: filter_to_agent_map},
@@ -154,58 +155,84 @@ module Agents
154 155
       SEPARATOR = /[^\w_\-]+/
155 156
 
156 157
       def setup
157
-        @timeout = 0
158
+        require 'twitter/json_stream'
159
+        @filter_to_agent_map = @config[:filter_to_agent_map]
158 160
       end
159 161
 
160 162
       def run
161
-        recent_tweets = []
162
-        filter_to_agent_map = @config[:filter_to_agent_map]
163
-
164
-        stream!(filter_to_agent_map.keys, @agent) do |status|
165
-          if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
166
-            puts "Skipping retweet: #{status["text"]}"
167
-          elsif recent_tweets.include?(status["id_str"])
168
-            puts "Skipping duplicate tweet: #{status["text"]}"
169
-          else
170
-            recent_tweets << status["id_str"]
171
-            recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
172
-            puts status["text"]
173
-            filter_to_agent_map.keys.each do |filter|
174
-              if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
175
-                filter_to_agent_map[filter].each do |agent|
176
-                  puts " -> #{agent.name}"
177
-                  agent.process_tweet(filter, status)
178
-                end
179
-              end
180
-            end
163
+        @recent_tweets = []
164
+        EventMachine.run do
165
+          stream!(@filter_to_agent_map.keys, @agent) do |status|
166
+            handle_status(status)
181 167
           end
182 168
         end
169
+        Thread.stop
170
+      end
171
+
172
+      def stop
173
+        EventMachine.stop_event_loop if EventMachine.reactor_running?
174
+        thread.terminate
183 175
       end
184 176
 
185 177
       private
186 178
       def stream!(filters, agent, &block)
187 179
         filters = filters.map(&:downcase).uniq
188 180
 
189
-        method = (filters && filters.length > 0) ? [:filter, track: filters.map {|f| CGI::escape(f) }.join(",")] : [:sample]
190
-        client.send(*method) do |tweet|
191
-          @timeout = 0
192
-          return unless tweet.class == Twitter::Tweet
193
-          status = ActiveSupport::HashWithIndifferentAccess.new(tweet.to_h)
194
-          status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
195
-          yield status
181
+        stream = Twitter::JSONStream.connect(
182
+          :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
183
+          :ssl     => true,
184
+          :oauth   => {
185
+            :consumer_key    => agent.twitter_consumer_key,
186
+            :consumer_secret => agent.twitter_consumer_secret,
187
+            :access_key      => agent.twitter_oauth_token,
188
+            :access_secret   => agent.twitter_oauth_token_secret
189
+          }
190
+        )
191
+
192
+        stream.each_item do |status|
193
+          block.call(status)
194
+        end
195
+
196
+        stream.on_error do |message|
197
+          STDERR.puts " --> Twitter error: #{message} <--"
198
+        end
199
+
200
+        stream.on_no_data do |message|
201
+          STDERR.puts " --> Got no data for awhile; trying to reconnect."
202
+          stop
203
+        end
204
+
205
+        stream.on_max_reconnects do |timeout, retries|
206
+          STDERR.puts " --> Oops, tried too many times! <--"
207
+          sleep 60
208
+          stop
196 209
         end
197
-      rescue Twitter::Error => e
198
-        @timeout += 60
199
-        puts "Twitter raised '#{e.class}', sleeping for #{@timeout} seconds"
200
-        sleep @timeout
201 210
       end
202 211
 
203
-      def client
204
-        @client ||= Twitter::Streaming::Client.new do |config|
205
-          config.consumer_key        = @agent.twitter_consumer_key
206
-          config.consumer_secret     = @agent.twitter_consumer_secret
207
-          config.access_token        = @agent.twitter_oauth_token
208
-          config.access_token_secret = @agent.twitter_oauth_token_secret
212
+      def handle_status(status)
213
+        status = JSON.parse(status) if status.is_a?(String)
214
+        return unless status
215
+        return if status.has_key?('delete')
216
+        return unless status['text']
217
+        status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
218
+
219
+        if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
220
+          puts "Skipping retweet: #{status["text"]}"
221
+          return
222
+        elsif @recent_tweets.include?(status["id_str"])
223
+          puts "Skipping duplicate tweet: #{status["text"]}"
224
+          return
225
+        end
226
+
227
+        @recent_tweets << status["id_str"]
228
+        @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
229
+        puts status["text"]
230
+        @filter_to_agent_map.keys.each do |filter|
231
+          next unless (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
232
+          @filter_to_agent_map[filter].each do |agent|
233
+            puts " -> #{agent.name}"
234
+            agent.process_tweet(filter, status)
235
+          end
209 236
         end
210 237
       end
211 238
     end

+ 1 - 2
lib/agent_runner.rb

@@ -16,7 +16,7 @@ class AgentRunner
16 16
     @mutex = Mutex.new
17 17
     @scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
18 18
 
19
-    @scheduler.every 5 do
19
+    @scheduler.every 1 do
20 20
       restart_dead_workers if @running
21 21
     end
22 22
 
@@ -39,7 +39,6 @@ class AgentRunner
39 39
     run_workers
40 40
 
41 41
     while @running
42
-      #puts "r"
43 42
       if signal = @signal_queue.shift
44 43
         handle_signal(signal)
45 44
       end

+ 74 - 54
spec/models/agents/twitter_stream_agent_spec.rb

@@ -168,84 +168,104 @@ describe Agents::TwitterStreamAgent do
168 168
       @mock_agent = mock!
169 169
       @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
170 170
       @worker = Agents::TwitterStreamAgent::Worker.new(@config)
171
+      @worker.instance_variable_set(:@recent_tweets, [])
171 172
       @worker.setup
172 173
     end
173 174
 
174 175
     context "#run" do
175
-      it "calls the agent to process the tweet" do
176
-        stub.instance_of(IO).puts
177
-        mock(@mock_agent).name { 'mock' }
178
-        mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
179
-        mock(@worker).stream!(['agent'], @agent).yields({'text' => 'agent'})
180
-
176
+      it "starts the stream" do
177
+        mock(EventMachine).run.yields
178
+        mock(@worker).stream!(['agent'], @agent)
179
+        mock(Thread).stop
181 180
         @worker.run
182 181
       end
183
-      it "skips retweets" do
184
-        mock.instance_of(IO).puts('Skipping retweet: retweet')
185
-        mock(@worker).stream!(['agent'], @agent).yields({'retweeted_status' => {'' => true}, 'text' => 'retweet'})
186 182
 
183
+      it "yields received tweets" do
184
+        mock(EventMachine).run.yields
185
+        mock(@worker).stream!(['agent'], @agent).yields('status' => 'hello')
186
+        mock(@worker).handle_status('status' => 'hello')
187
+        mock(Thread).stop
187 188
         @worker.run
188 189
       end
190
+    end
189 191
 
190
-      it "deduplicates tweets" do
191
-        mock.instance_of(IO).puts("dup")
192
-        mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
193
-        # RR does not support multiple yield calls
194
-        class DoubleYield < Agents::TwitterStreamAgent::Worker
195
-          def stream!(_, __, &block)
196
-            yield({'text' => 'dup'})
197
-            yield({'text' => 'dup'})
198
-          end
199
-        end
200
-        worker = DoubleYield.new(@config)
201
-
202
-        worker.run
192
+    context "#stop" do
193
+      it "stops the thread" do
194
+        mock(@worker.thread).terminate
195
+        @worker.stop
203 196
       end
204 197
     end
205 198
 
206
-    context "#stream!" do
207
-      before(:each) do
208
-        @client_mock = mock!
209
-        stub(@worker).client { @client_mock }
199
+    context "stream!" do
200
+      def stub_without(method = nil)
201
+        stream_stub = stub!
202
+        stream_stub.each_item if method != :each_item
203
+        stream_stub.on_error if method != :on_error
204
+        stream_stub.on_no_data if method != :on_no_data
205
+        stream_stub.on_max_reconnects if method != :on_max_reconnects
206
+        stub(Twitter::JSONStream).connect { stream_stub }
207
+        stream_stub
210 208
       end
211 209
 
212
-      it "calls the sample method without filters" do
213
-        @client_mock.sample
214
-        @worker.send(:stream!, [], @mock_agent)
210
+      it "initializes Twitter::JSONStream" do
211
+        mock(Twitter::JSONStream).connect({:path=>"/1/statuses/filter.json?track=agent",
212
+                                           :ssl=>true, :oauth=>{:consumer_key=>"twitteroauthkey",
213
+                                           :consumer_secret=>"twitteroauthsecret",
214
+                                           :access_key=>"1234token",
215
+                                           :access_secret=>"56789secret"}
216
+                                          }) { stub_without }
217
+        @worker.send(:stream!, ['agent'], @agent)
215 218
       end
216 219
 
217
-      it "calls the filter method when filters are provided" do
218
-        @client_mock.filter(track: 'filter')
219
-        @worker.send(:stream!, ['filter'], @mock_agent)
220
-      end
220
+      context "callback handling" do
221
+        it "logs error messages" do
222
+          stub_without(:on_error).on_error.yields('woups')
223
+          mock(STDERR).puts(" --> Twitter error: woups <--")
224
+          @worker.send(:stream!, ['agent'], @agent)
225
+        end
226
+
227
+        it "stop when no data was received"do
228
+          stub_without(:on_no_data).on_no_data.yields
229
+          mock(@worker).stop
230
+          mock(STDERR).puts(" --> Got no data for awhile; trying to reconnect.")
231
+          @worker.send(:stream!, ['agent'], @agent)
232
+        end
233
+
234
+        it "sleeps for 60 seconds on_max_reconnects" do
235
+          stub_without(:on_max_reconnects).on_max_reconnects.yields
236
+          mock(STDERR).puts(" --> Oops, tried too many times! <--")
237
+          mock(@worker).sleep(60)
238
+          mock(@worker).stop
239
+          @worker.send(:stream!, ['agent'], @agent)
240
+        end
221 241
 
222
-      it "only handles instances of Twitter::Tweet" do
223
-        @client_mock.sample.yields(Object.new)
224
-        expect { |blk| @worker.send(:stream!, [], @mock_agent, &blk) }.not_to yield_control
242
+        it "yields every status received" do
243
+          stub_without(:each_item).each_item.yields({'text' => 'hello'})
244
+          @worker.send(:stream!, ['agent'], @agent) do |status|
245
+            expect(status).to eq({'text' => 'hello'})
246
+          end
247
+        end
225 248
       end
249
+    end
226 250
 
227
-      it "yields Hashes for received Twitter:Tweet instances" do
228
-        @client_mock.sample.yields(Twitter::Tweet.new(id: '1234', text: 'test'))
229
-        expect { |blk| @worker.send(:stream!, [], @mock_agent, &blk) }.to yield_with_args({'id' => '1234', 'text' => 'test'})
251
+    context "#handle_status" do
252
+      it "skips retweets" do
253
+        mock.instance_of(IO).puts('Skipping retweet: retweet')
254
+        @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}})
230 255
       end
231 256
 
232
-      it "it backs of 60 seconds for every Twitter::Error::TooManyRequests exception rescued" do
233
-        stub.instance_of(IO).puts
234
-        mock(@worker).sleep(60)
235
-        @client_mock.sample { raise Twitter::Error::TooManyRequests }
236
-        @worker.send(:stream!, [], @mock_agent)
237
-        @client_mock.sample { raise Twitter::Error::TooManyRequests }
238
-        mock(@worker).sleep(120)
239
-        @worker.send(:stream!, [], @mock_agent)
257
+      it "deduplicates tweets" do
258
+        mock.instance_of(IO).puts("dup")
259
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
260
+        mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
261
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
240 262
       end
241
-    end
242 263
 
243
-    context "#client" do
244
-      it "initializes the client" do
245
-        client = @worker.send(:client)
246
-        expect(client).to be_a(Twitter::Streaming::Client)
247
-        expect(client.access_token).to eq('1234token')
248
-        expect(client.access_token_secret).to eq('56789secret')
264
+      it "calls the agent to process the tweet" do
265
+        stub.instance_of(IO).puts
266
+        mock(@mock_agent).name { 'mock' }
267
+        mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
268
+        @worker.send(:handle_status, {'text' => 'agent'})
249 269
       end
250 270
     end
251 271
   end